﻿using iTool.Common.Options;
using Orleans;
using Orleans.Concurrency;
using Orleans.Providers;
using Orleans.Runtime;
using Orleans.Storage;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Threading.Tasks;

namespace iTool.ClusterComponent
{
    public interface IHashStorageService : iToolServiceWithStringKey
    {
        Task<ConcurrentDictionary<string, string>> GetAsync();
        Task<string> GetAsync(string field);
        Task SetAsync(string field, string bytes);
        Task RemoveAsync(string field);
        Task RemoveAsync();
        Task<HashState> GetStateAsync();
        Task<int> GetVersionAsync();
    }

    [StorageProvider(ProviderName = "HashStorageService")]
    public class HashStorageService : Grain<HashState>, IHashStorageService, IGrainStorage
    {
        readonly AdoNetOptions options;
        string connection;
        public HashStorageService(AdoNetOptions options)
        {
            this.options = options;
            this.connection = this.options.GetConnection();
        }

        public Task<ConcurrentDictionary<string, string>> GetAsync()
        {
            return Task.FromResult(this.State.keyValuePairs);
        }

        public Task<string> GetAsync(string field)
        {
            if (this.State.keyValuePairs != null)
            {
                this.State.keyValuePairs.TryGetValue(field, out string payload);
                return Task.FromResult(payload);
            }
                return Task.FromResult(string.Empty);

        }
        public async Task SetAsync(string field, string payload)
        {
            this.State.Version++;
            this.State.keyValuePairs = this.State.keyValuePairs ?? new ConcurrentDictionary<string, string>();
            if (this.State.keyValuePairs.ContainsKey(field))
            {
                if (this.State.keyValuePairs[field] != payload)
                {
                    this.State.keyValuePairs[field] = payload;
                    await this.WriteStateAsync(field, payload, true);
                }
            }
            else
            {
                this.State.keyValuePairs.TryAdd(field, payload);
                await this.WriteStateAsync(field, payload, false);
            }

        }

        public async Task RemoveAsync(string field)
        {
            this.State.Version++;
            if (this.State.keyValuePairs != null) {
                this.State.keyValuePairs.Remove(field, out _);
                await this.ClearStateAsync(field);
            }
        }

        public async Task RemoveAsync()
        {
            if (this.State.keyValuePairs != null)
            {
                await this.ClearStateAsync();
                this.DeactivateOnIdle();
            }
        }

        public async Task ClearStateAsync(string field)
        {
            await using (SqlConnection conn = new SqlConnection(this.connection))
            {
                string delete = "DELETE ClusterCacheHashTable where [field] = @field";
                await conn.OpenAsync();
                await using (SqlCommand command = new SqlCommand(delete, conn))
                {
                    command.Parameters.AddRange(new SqlParameter[]
                    {
                        new SqlParameter("field",field)
                    });
                    await command.ExecuteNonQueryAsync();
                }
            }
        }


        public async Task ClearStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {
            await using (SqlConnection conn = new SqlConnection(this.connection))
            {
                string delete = "DELETE ClusterCacheHashTable where [key] = @key";
                await conn.OpenAsync();
                await using (SqlCommand command = new SqlCommand(delete, conn))
                {
                    command.Parameters.AddRange(new SqlParameter[]
                    {
                                new SqlParameter("key",grainReference.GetPrimaryKeyString())
                    });
                    await command.ExecuteNonQueryAsync();
                }
            }
        }

        public async Task ReadStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {

            string key = grainReference.GetPrimaryKeyString();

            await using (SqlConnection conn = new SqlConnection(this.connection))
            {
                string read = "SELECT [field],[value] from ClusterCacheHashTable where [key] = @key";

                await conn.OpenAsync();
                var keyValues = new ConcurrentDictionary<string, string>();
                await using (SqlCommand command = new SqlCommand(read, conn))
                {
                    command.Parameters.AddRange(new SqlParameter[]
                    {
                        new SqlParameter("key",key)
                    });
                    var reader = await command.ExecuteReaderAsync();
                    while (await reader.ReadAsync())
                    {
                        keyValues.TryAdd(reader["field"] as string, reader["value"] as string);
                    }
                }

                grainState.State = new HashState
                {
                    keyValuePairs = keyValues
                };
            }
        }

        public async Task WriteStateAsync(string field, string payload, bool isUpdate)
        {
            await using (SqlConnection conn = new SqlConnection(this.connection))
            {
                var key = this.GetPrimaryKeyString();
                string sql = isUpdate ? "update ClusterCacheHashTable set [value]=@value where [key]=@key and [field]=@field" :
                    "insert into ClusterCacheHashTable([key],[field],[value]) values(@key,@field,@value)";

                await conn.OpenAsync();
                await using (SqlCommand command = new SqlCommand(sql, conn))
                {
                    command.Parameters.AddRange(isUpdate ? new SqlParameter[]
                    {
                        new SqlParameter("key",key),
                        new SqlParameter("field",field),
                        new SqlParameter("value",payload)
                    } : new SqlParameter[]
                    {
                        new SqlParameter("key",key),
                        new SqlParameter("field",field),
                        new SqlParameter("value",payload)
                    });
                    await command.ExecuteNonQueryAsync();
                }
            }
        }

        public Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {
            return Task.CompletedTask;
        }

        public Task<HashState> GetStateAsync()
        {
            return Task.FromResult(this.State);
        }

        public Task<int> GetVersionAsync()
        {
            return Task.FromResult(this.State.Version);
        }
    }


    public class HashState
    {
        public int Version { get; set; }
        public ConcurrentDictionary<string, string> keyValuePairs { get; set; }
    }
}
